/*

其中设备码: box_9SLAD6CH订阅列表
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/das/cmd/#
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/das/state/request
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICE/license/update
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICE/reboot
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICE/time/update
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICEinfo/config/request
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICEinfo/monitor/driver_list
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICEinfo/monitor/request
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/log/log_list
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/log/log_pull
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/log/monitor
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/message/his_alarm/request
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/ota/upgrade
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/point/event/control
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/point/event/his_recall
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/register
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/sync/full_request
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/sync/request

aim:
模拟设备码前缀为 CODE_0 ~ CODE_999

*/

package main

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"os"
	"strconv"
	"time"
)


var f *os.File
var ready chan struct{} // 准备开始
//var addr string = "tcp://192.168.40.43:1883"
var addr_pub string = "tcp://127.0.0.1:1885"
var addr_sub string = "tcp://127.0.0.1:1884"

var f2 mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	parseInt, err := strconv.ParseInt(string(msg.Payload()), 0, 0)
	if err != nil {
		fmt.Println(err)
		return
	}

	//s := "纳秒: " +  strconv.FormatInt(time.Now().UnixNano() - parseInt, 10) + "\n"
	s := "hao秒: " +  strconv.FormatInt((time.Now().UnixNano() - parseInt) / 1e6, 10) + "\n"

	fmt.Println(s)
	//
	//f.Write([]byte(s))
}

func sub()  {
	staticId := "CODE_"
	for i := 0; i < 300;i++ {
		opts := mqtt.NewClientOptions().AddBroker(addr_sub).SetKeepAlive(time.Second * 120)
		opts.SetClientID(staticId + strconv.Itoa(i))
		
		opts.SetAutoReconnect(true)
		opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
			fmt.Println("lost conn")
		})
		opts.SetOnConnectHandler(func(c mqtt.Client) {
			fmt.Println("good conn")

			code := staticId + strconv.Itoa(i)

			c.Subscribe("/dev/1/2/3/4/5", 0, f2)

			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/das/cmd", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/das/state/request", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "DEVICE/license/update", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "DEVICE/reboot", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "DEVICE/time/update", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "DEVICEinfo/config/request", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "DEVICEinfo/monitor/driver_list", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "DEVICEinfo/monitor/request", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/log/log_list", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/log/log_pull", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/log/monitor", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/message/his_alarm/request", 0, f2);
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/ota/upgrade", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/point/event/control", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/point/event/his_recall", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/register", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/sync/full_request", 0, f2)
			c.Subscribe("/DEVICE/BASE/MAGUS/cloud/" + code + "/sync/request", 0, f2)

		})

		c := mqtt.NewClient(opts)
		token := c.Connect()
		if token.Wait() && token.Error() != nil {
			panic(token.Error())
		}

		
		time.Sleep(time.Millisecond)
		fmt.Println(i)
	}

	ready <- struct{}{}
}

func pub()  {
	tm := time.Now()
	<-ready
	fmt.Println("时间: ", time.Since(tm).String())

	fmt.Println("开始成产数据")
	opts := mqtt.NewClientOptions().AddBroker(addr_pub)
	opts.SetClientID("pub_once")
	opts.SetAutoReconnect(true)

	c := mqtt.NewClient(opts)
	token := c.Connect()
	if token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
	for {
		time.Sleep(time.Second)

		b := []byte(strconv.FormatInt(time.Now().UnixNano(), 10))

		c.Publish("/DEVICE/BASE/MAGUS/cloud/CODE_10/das/cmd", 0, false, b)
		c.Publish("/dev/1/2/3/4/5", 0, false, b)
	}
}

func main() {
	f, _ = os.OpenFile("sb.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0660)
	go func() {
		for {
			time.Sleep(time.Second)
			f.Sync()
		}
	}()

	ready = make(chan struct{}, 1)

	go sub()
	go pub()

	select {
	}
}
